#include <xrpld/app/ledger/TransactionMaster.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/app/misc/SHAMapStoreImp.h>
#include <xrpld/app/rdb/State.h>
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/core/ConfigSections.h>

#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/nodestore/Scheduler.h>
#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
#include <xrpl/shamap/SHAMapMissingNode.h>

#include <boost/algorithm/string/predicate.hpp>

namespace ripple {
void
SHAMapStoreImp::SavedStateDB::init(
    BasicConfig const& config,
    std::string const& dbName)
{
    std::lock_guard lock(mutex_);
    initStateDB(sqlDb_, config, dbName);
}

LedgerIndex
SHAMapStoreImp::SavedStateDB::getCanDelete()
{
    std::lock_guard lock(mutex_);

    return ripple::getCanDelete(sqlDb_);
}

LedgerIndex
SHAMapStoreImp::SavedStateDB::setCanDelete(LedgerIndex canDelete)
{
    std::lock_guard lock(mutex_);

    return ripple::setCanDelete(sqlDb_, canDelete);
}

SavedState
SHAMapStoreImp::SavedStateDB::getState()
{
    std::lock_guard lock(mutex_);

    return ripple::getSavedState(sqlDb_);
}

void
SHAMapStoreImp::SavedStateDB::setState(SavedState const& state)
{
    std::lock_guard lock(mutex_);
    ripple::setSavedState(sqlDb_, state);
}

void
SHAMapStoreImp::SavedStateDB::setLastRotated(LedgerIndex seq)
{
    std::lock_guard lock(mutex_);
    ripple::setLastRotated(sqlDb_, seq);
}

//------------------------------------------------------------------------------

SHAMapStoreImp::SHAMapStoreImp(
    Application& app,
    NodeStore::Scheduler& scheduler,
    beast::Journal journal)
    : app_(app)
    , scheduler_(scheduler)
    , journal_(journal)
    , working_(true)
    , canDelete_(std::numeric_limits<LedgerIndex>::max())
{
    Config& config{app.config()};

    Section& section{config.section(ConfigSection::nodeDatabase())};
    if (section.empty())
    {
        Throw<std::runtime_error>(
            "Missing [" + ConfigSection::nodeDatabase() +
            "] entry in configuration file");
    }

    // RocksDB only. Use sensible defaults if no values specified.
    if (boost::iequals(get(section, "type"), "RocksDB"))
    {
        if (!section.exists("cache_mb"))
        {
            section.set(
                "cache_mb",
                std::to_string(config.getValueFor(SizedItem::hashNodeDBCache)));
        }

        if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2))
            section.set("filter_bits", "10");
    }

    get_if_exists(section, "online_delete", deleteInterval_);

    if (deleteInterval_)
    {
        // Configuration that affects the behavior of online delete
        get_if_exists(section, "delete_batch", deleteBatch_);
        std::uint32_t temp;
        if (get_if_exists(section, "back_off_milliseconds", temp) ||
            // Included for backward compaibility with an undocumented setting
            get_if_exists(section, "backOff", temp))
        {
            backOff_ = std::chrono::milliseconds{temp};
        }
        if (get_if_exists(section, "age_threshold_seconds", temp))
            ageThreshold_ = std::chrono::seconds{temp};
        if (get_if_exists(section, "recovery_wait_seconds", temp))
            recoveryWaitTime_ = std::chrono::seconds{temp};

        get_if_exists(section, "advisory_delete", advisoryDelete_);

        auto const minInterval = config.standalone()
            ? minimumDeletionIntervalSA_
            : minimumDeletionInterval_;
        if (deleteInterval_ < minInterval)
        {
            Throw<std::runtime_error>(
                "online_delete must be at least " +
                std::to_string(minInterval));
        }

        if (config.LEDGER_HISTORY > deleteInterval_)
        {
            Throw<std::runtime_error>(
                "online_delete must not be less than ledger_history "
                "(currently " +
                std::to_string(config.LEDGER_HISTORY) + ")");
        }

        state_db_.init(config, dbName_);
        dbPaths();
    }
}

std::unique_ptr<NodeStore::Database>
SHAMapStoreImp::makeNodeStore(int readThreads)
{
    auto nscfg = app_.config().section(ConfigSection::nodeDatabase());

    // Provide default values:
    if (!nscfg.exists("cache_size"))
        nscfg.set(
            "cache_size",
            std::to_string(app_.config().getValueFor(
                SizedItem::treeCacheSize, std::nullopt)));

    if (!nscfg.exists("cache_age"))
        nscfg.set(
            "cache_age",
            std::to_string(app_.config().getValueFor(
                SizedItem::treeCacheAge, std::nullopt)));

    std::unique_ptr<NodeStore::Database> db;

    if (deleteInterval_)
    {
        SavedState state = state_db_.getState();
        auto writableBackend = makeBackendRotating(state.writableDb);
        auto archiveBackend = makeBackendRotating(state.archiveDb);
        if (!state.writableDb.size())
        {
            state.writableDb = writableBackend->getName();
            state.archiveDb = archiveBackend->getName();
            state_db_.setState(state);
        }

        // Create NodeStore with two backends to allow online deletion of
        // data
        auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
            scheduler_,
            readThreads,
            std::move(writableBackend),
            std::move(archiveBackend),
            nscfg,
            app_.logs().journal(nodeStoreName_));
        fdRequired_ += dbr->fdRequired();
        dbRotating_ = dbr.get();
        db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));
    }
    else
    {
        db = NodeStore::Manager::instance().make_Database(
            megabytes(
                app_.config().getValueFor(SizedItem::burstSize, std::nullopt)),
            scheduler_,
            readThreads,
            nscfg,
            app_.logs().journal(nodeStoreName_));
        fdRequired_ += db->fdRequired();
    }
    return db;
}

void
SHAMapStoreImp::onLedgerClosed(std::shared_ptr<Ledger const> const& ledger)
{
    {
        std::lock_guard lock(mutex_);
        newLedger_ = ledger;
        working_ = true;
    }
    cond_.notify_one();
}

void
SHAMapStoreImp::rendezvous() const
{
    if (!working_)
        return;

    std::unique_lock<std::mutex> lock(mutex_);
    rendezvous_.wait(lock, [&] { return !working_; });
}

int
SHAMapStoreImp::fdRequired() const
{
    return fdRequired_;
}

bool
SHAMapStoreImp::copyNode(std::uint64_t& nodeCount, SHAMapTreeNode const& node)
{
    // Copy a single record from node to dbRotating_
    dbRotating_->fetchNodeObject(
        node.getHash().as_uint256(),
        0,
        NodeStore::FetchType::synchronous,
        true);
    if (!(++nodeCount % checkHealthInterval_))
    {
        if (healthWait() == stopping)
            return false;
    }

    return true;
}

void
SHAMapStoreImp::run()
{
    beast::setCurrentThreadName("SHAMapStore");
    LedgerIndex lastRotated = state_db_.getState().lastRotated;
    netOPs_ = &app_.getOPs();
    ledgerMaster_ = &app_.getLedgerMaster();
    fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache());
    treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache());

    if (advisoryDelete_)
        canDelete_ = state_db_.getCanDelete();

    while (true)
    {
        healthy_ = true;
        std::shared_ptr<Ledger const> validatedLedger;

        {
            std::unique_lock<std::mutex> lock(mutex_);
            working_ = false;
            rendezvous_.notify_all();
            if (stop_)
            {
                return;
            }
            cond_.wait(lock);
            if (newLedger_)
            {
                validatedLedger = std::move(newLedger_);
            }
            else
                continue;
        }

        LedgerIndex const validatedSeq = validatedLedger->info().seq;
        if (!lastRotated)
        {
            lastRotated = validatedSeq;
            state_db_.setLastRotated(lastRotated);
        }

        bool const readyToRotate =
            validatedSeq >= lastRotated + deleteInterval_ &&
            canDelete_ >= lastRotated - 1 && healthWait() == keepGoing;

        // will delete up to (not including) lastRotated
        if (readyToRotate)
        {
            JLOG(journal_.warn())
                << "rotating  validatedSeq " << validatedSeq << " lastRotated "
                << lastRotated << " deleteInterval " << deleteInterval_
                << " canDelete_ " << canDelete_ << " state "
                << app_.getOPs().strOperatingMode(false) << " age "
                << ledgerMaster_->getValidatedLedgerAge().count() << 's';

            clearPrior(lastRotated);
            if (healthWait() == stopping)
                return;

            JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
            std::uint64_t nodeCount = 0;

            try
            {
                validatedLedger->stateMap().snapShot(false)->visitNodes(
                    std::bind(
                        &SHAMapStoreImp::copyNode,
                        this,
                        std::ref(nodeCount),
                        std::placeholders::_1));
            }
            catch (SHAMapMissingNode const& e)
            {
                JLOG(journal_.error())
                    << "Missing node while copying ledger before rotate: "
                    << e.what();
                continue;
            }

            if (healthWait() == stopping)
                return;
            // Only log if we completed without a "health" abort
            JLOG(journal_.debug()) << "copied ledger " << validatedSeq
                                   << " nodecount " << nodeCount;

            JLOG(journal_.debug()) << "freshening caches";
            freshenCaches();
            if (healthWait() == stopping)
                return;
            // Only log if we completed without a "health" abort
            JLOG(journal_.debug()) << validatedSeq << " freshened caches";

            JLOG(journal_.trace()) << "Making a new backend";
            auto newBackend = makeBackendRotating();
            JLOG(journal_.debug())
                << validatedSeq << " new backend " << newBackend->getName();

            clearCaches(validatedSeq);
            if (healthWait() == stopping)
                return;

            lastRotated = validatedSeq;

            dbRotating_->rotate(
                std::move(newBackend),
                [&](std::string const& writableName,
                    std::string const& archiveName) {
                    SavedState savedState;
                    savedState.writableDb = writableName;
                    savedState.archiveDb = archiveName;
                    savedState.lastRotated = lastRotated;
                    state_db_.setState(savedState);

                    clearCaches(validatedSeq);
                });

            JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
        }
    }
}

void
SHAMapStoreImp::dbPaths()
{
    Section section{app_.config().section(ConfigSection::nodeDatabase())};
    boost::filesystem::path dbPath = get(section, "path");

    if (boost::filesystem::exists(dbPath))
    {
        if (!boost::filesystem::is_directory(dbPath))
        {
            journal_.error()
                << "node db path must be a directory. " << dbPath.string();
            Throw<std::runtime_error>("node db path must be a directory.");
        }
    }
    else
    {
        boost::filesystem::create_directories(dbPath);
    }

    SavedState state = state_db_.getState();

    {
        auto update = [&dbPath](std::string& sPath) {
            if (sPath.empty())
                return false;

            // Check if configured "path" matches stored directory path
            using namespace boost::filesystem;
            auto const stored{path(sPath)};
            if (stored.parent_path() == dbPath)
                return false;

            sPath = (dbPath / stored.filename()).string();
            return true;
        };

        if (update(state.writableDb))
        {
            update(state.archiveDb);
            state_db_.setState(state);
        }
    }

    bool writableDbExists = false;
    bool archiveDbExists = false;

    std::vector<boost::filesystem::path> pathsToDelete;
    for (boost::filesystem::directory_iterator it(dbPath);
         it != boost::filesystem::directory_iterator();
         ++it)
    {
        if (!state.writableDb.compare(it->path().string()))
            writableDbExists = true;
        else if (!state.archiveDb.compare(it->path().string()))
            archiveDbExists = true;
        else if (!dbPrefix_.compare(it->path().stem().string()))
            pathsToDelete.push_back(it->path());
    }

    if ((!writableDbExists && state.writableDb.size()) ||
        (!archiveDbExists && state.archiveDb.size()) ||
        (writableDbExists != archiveDbExists) ||
        state.writableDb.empty() != state.archiveDb.empty())
    {
        boost::filesystem::path stateDbPathName =
            app_.config().legacy("database_path");
        stateDbPathName /= dbName_;
        stateDbPathName += "*";

        journal_.error()
            << "state db error:\n"
            << "  writableDbExists " << writableDbExists << " archiveDbExists "
            << archiveDbExists << '\n'
            << "  writableDb '" << state.writableDb << "' archiveDb '"
            << state.archiveDb << "\n\n"
            << "The existing data is in a corrupted state.\n"
            << "To resume operation, remove the files matching "
            << stateDbPathName.string() << " and contents of the directory "
            << get(section, "path") << '\n'
            << "Optionally, you can move those files to another\n"
            << "location if you wish to analyze or back up the data.\n"
            << "However, there is no guarantee that the data in its\n"
            << "existing form is usable.";

        Throw<std::runtime_error>("state db error");
    }

    // The necessary directories exist. Now, remove any others.
    for (boost::filesystem::path& p : pathsToDelete)
        boost::filesystem::remove_all(p);
}

std::unique_ptr<NodeStore::Backend>
SHAMapStoreImp::makeBackendRotating(std::string path)
{
    Section section{app_.config().section(ConfigSection::nodeDatabase())};
    boost::filesystem::path newPath;

    if (path.size())
    {
        newPath = path;
    }
    else
    {
        boost::filesystem::path p = get(section, "path");
        p /= dbPrefix_;
        p += ".%%%%";
        newPath = boost::filesystem::unique_path(p);
    }
    section.set("path", newPath.string());

    auto backend{NodeStore::Manager::instance().make_Backend(
        section,
        megabytes(
            app_.config().getValueFor(SizedItem::burstSize, std::nullopt)),
        scheduler_,
        app_.logs().journal(nodeStoreName_))};
    backend->open();
    return backend;
}

void
SHAMapStoreImp::clearSql(
    LedgerIndex lastRotated,
    std::string const& TableName,
    std::function<std::optional<LedgerIndex>()> const& getMinSeq,
    std::function<void(LedgerIndex)> const& deleteBeforeSeq)
{
    XRPL_ASSERT(
        deleteInterval_,
        "ripple::SHAMapStoreImp::clearSql : nonzero delete interval");
    LedgerIndex min = std::numeric_limits<LedgerIndex>::max();

    {
        JLOG(journal_.trace())
            << "Begin: Look up lowest value of: " << TableName;
        auto m = getMinSeq();
        JLOG(journal_.trace()) << "End: Look up lowest value of: " << TableName;
        if (!m)
            return;
        min = *m;
    }

    if (min > lastRotated || healthWait() == stopping)
        return;
    if (min == lastRotated)
    {
        // Micro-optimization mainly to clarify logs
        JLOG(journal_.trace()) << "Nothing to delete from " << TableName;
        return;
    }

    JLOG(journal_.debug()) << "start deleting in: " << TableName << " from "
                           << min << " to " << lastRotated;
    while (min < lastRotated)
    {
        min = std::min(lastRotated, min + deleteBatch_);
        JLOG(journal_.trace())
            << "Begin: Delete up to " << deleteBatch_
            << " rows with LedgerSeq < " << min << " from: " << TableName;
        deleteBeforeSeq(min);
        JLOG(journal_.trace())
            << "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < "
            << min << " from: " << TableName;
        if (healthWait() == stopping)
            return;
        if (min < lastRotated)
            std::this_thread::sleep_for(backOff_);
        if (healthWait() == stopping)
            return;
    }
    JLOG(journal_.debug()) << "finished deleting from: " << TableName;
}

void
SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
{
    ledgerMaster_->clearLedgerCachePrior(validatedSeq);
    fullBelowCache_->clear();
}

void
SHAMapStoreImp::freshenCaches()
{
    if (freshenCache(*treeNodeCache_))
        return;
    if (freshenCache(app_.getMasterTransaction().getCache()))
        return;
}

void
SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
{
    // Do not allow ledgers to be acquired from the network
    // that are about to be deleted.
    minimumOnline_ = lastRotated + 1;
    JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to "
                           << lastRotated;
    ledgerMaster_->clearPriorLedgers(lastRotated);
    JLOG(journal_.trace()) << "End: Clear internal ledgers up to "
                           << lastRotated;
    if (healthWait() == stopping)
        return;

    SQLiteDatabase* const db =
        dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase());

    if (!db)
        Throw<std::runtime_error>("Failed to get relational database");

    clearSql(
        lastRotated,
        "Ledgers",
        [db]() -> std::optional<LedgerIndex> { return db->getMinLedgerSeq(); },
        [db](LedgerIndex min) -> void { db->deleteBeforeLedgerSeq(min); });
    if (healthWait() == stopping)
        return;

    if (!app_.config().useTxTables())
        return;

    clearSql(
        lastRotated,
        "Transactions",
        [&db]() -> std::optional<LedgerIndex> {
            return db->getTransactionsMinLedgerSeq();
        },
        [&db](LedgerIndex min) -> void {
            db->deleteTransactionsBeforeLedgerSeq(min);
        });
    if (healthWait() == stopping)
        return;

    clearSql(
        lastRotated,
        "AccountTransactions",
        [&db]() -> std::optional<LedgerIndex> {
            return db->getAccountTransactionsMinLedgerSeq();
        },
        [&db](LedgerIndex min) -> void {
            db->deleteAccountTransactionsBeforeLedgerSeq(min);
        });
    if (healthWait() == stopping)
        return;
}

SHAMapStoreImp::HealthResult
SHAMapStoreImp::healthWait()
{
    auto age = ledgerMaster_->getValidatedLedgerAge();
    OperatingMode mode = netOPs_->getOperatingMode();
    std::unique_lock lock(mutex_);
    while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_))
    {
        lock.unlock();
        JLOG(journal_.warn()) << "Waiting " << recoveryWaitTime_.count()
                              << "s for node to stabilize. state: "
                              << app_.getOPs().strOperatingMode(mode, false)
                              << ". age " << age.count() << 's';
        std::this_thread::sleep_for(recoveryWaitTime_);
        age = ledgerMaster_->getValidatedLedgerAge();
        mode = netOPs_->getOperatingMode();
        lock.lock();
    }

    return stop_ ? stopping : keepGoing;
}

void
SHAMapStoreImp::stop()
{
    if (thread_.joinable())
    {
        {
            std::lock_guard lock(mutex_);
            stop_ = true;
            cond_.notify_one();
        }
        thread_.join();
    }
}

std::optional<LedgerIndex>
SHAMapStoreImp::minimumOnline() const
{
    // minimumOnline_ with 0 value is equivalent to unknown/not set.
    // Don't attempt to acquire ledgers if that value is unknown.
    if (deleteInterval_ && minimumOnline_)
        return minimumOnline_.load();
    return app_.getLedgerMaster().minSqlSeq();
}

//------------------------------------------------------------------------------

std::unique_ptr<SHAMapStore>
make_SHAMapStore(
    Application& app,
    NodeStore::Scheduler& scheduler,
    beast::Journal journal)
{
    return std::make_unique<SHAMapStoreImp>(app, scheduler, journal);
}

}  // namespace ripple
